package com.rookie.opcua.task;

import com.rookie.opcua.client.ClientGen;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import static com.google.common.collect.Lists.newArrayList;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;

/**
 * @author yugo
 */
@Component
@Slf4j
public class OpcUaSubscriptionTask {

    private final AtomicLong clientHandles = new AtomicLong(1L);

    private boolean fType = false;
    @Scheduled(cron = "0/30 * * * * ?")
    public void subscription() {
        try {
            if(!fType) {
                log.info("开始监听");
                OpcUaClient client = ClientGen.opcUaClient;
                //创建发布间隔1000ms的订阅对象
                UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get();

                // 定义需要监听的多个节点（可以根据实际情况添加更多节点）
                List<ReadValueId> readValueIds = Arrays.asList(
                        new ReadValueId(NodeId.parse("ns=3;i=1009"), AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE), // 监控变量1
                        new ReadValueId(NodeId.parse("ns=3;i=1010"), AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE), // 监控变量2
                        new ReadValueId(NodeId.parse("ns=3;i=1011"), AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE),  // 监控变量2
                        new ReadValueId(NodeId.parse("ns=3;i=1012"), AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE)  // 监控变量2
                );


                // 为每个变量创建一个监控项请求
                List<MonitoredItemCreateRequest> requests = new ArrayList<>();
                for (ReadValueId readValueId : readValueIds) {
                    UInteger clientHandle = uint(clientHandles.getAndIncrement());
                    MonitoringParameters parameters = new MonitoringParameters(
                            clientHandle,
                            1000.0,  // 监控间隔1000ms
                            null,     // 使用默认过滤器
                            uint(10), // 队列大小为10
                            true      // 丢弃旧配置
                    );

                    MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
                            readValueId, MonitoringMode.Reporting, parameters);
                    requests.add(request);
                }

                // 监听服务当前时间节点
                ReadValueId readValueId = new ReadValueId(
                        NodeId.parse("ns=3;i=1009"),
                        AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);

                // 每个项目的客户端句柄必须是唯一的
                UInteger clientHandle = uint(clientHandles.getAndIncrement());

                MonitoringParameters parameters = new MonitoringParameters(
                        clientHandle,
                        // 间隔
                        1000.0,
                        // 过滤器空表示使用默认值
                        null,
                        // 队列大小
                        uint(10),
                        // 是否丢弃旧配置
                        true
                );

                MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
                        readValueId, MonitoringMode.Reporting, parameters);

                // 创建监控项，并且注册变量值改变时候的回调函数。
                BiConsumer<UaMonitoredItem, Integer> onItemCreated =
                        (item, id) -> item.setValueConsumer(this::onSubscriptionValue);

                List<UaMonitoredItem> items = subscription.createMonitoredItems(
                        TimestampsToReturn.Both,
                        requests,
                        onItemCreated
                ).get();

                for (UaMonitoredItem item : items) {
                    if (item.getStatusCode().isGood()) {
                        fType = true;
                        log.info("item created for nodeId={}", item.getReadValueId().getNodeId());
                    } else {
                        log.warn(
                                "failed to create item for nodeId={} (status={})",
                                item.getReadValueId().getNodeId(), item.getStatusCode());
                    }
                }

                // 运行五秒然后停止
                Thread.sleep(5000);
            }
        } catch (Exception e) {
            log.error("订阅失败" + e.getMessage());
        }
    }

    private void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
        log.info(
                "subscription value received: item={}, value={}",
                item.getReadValueId().getNodeId(), value.getValue());
    }
}
